#  Licensed to Elasticsearch B.V. under one or more contributor
#  license agreements. See the NOTICE file distributed with
#  this work for additional information regarding copyright
#  ownership. Elasticsearch B.V. licenses this file to you under
#  the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.

import pickle
from datetime import date
from typing import Any, Dict

from pytest import fixture, raises

from elasticsearch_dsl import Date, Document, Object, Search, response
from elasticsearch_dsl.aggs import Terms
from elasticsearch_dsl.response.aggs import AggResponse, Bucket, BucketData
from elasticsearch_dsl.utils import AttrDict


@fixture
def agg_response(aggs_search: Search, aggs_data: Dict[str, Any]) -> response.Response:
    return response.Response(aggs_search, aggs_data)


def test_agg_response_is_pickleable(agg_response: response.Response) -> None:
    agg_response.hits
    r = pickle.loads(pickle.dumps(agg_response))

    assert r == agg_response
    assert r._search == agg_response._search
    assert r.hits == agg_response.hits


def test_response_is_pickleable(dummy_response: Dict[str, Any]) -> None:
    res = response.Response(Search(), dummy_response.body)  # type: ignore[attr-defined]
    res.hits
    r = pickle.loads(pickle.dumps(res))

    assert r == res
    assert r._search == res._search
    assert r.hits == res.hits


def test_hit_is_pickleable(dummy_response: Dict[str, Any]) -> None:
    res = response.Response(Search(), dummy_response)
    hits = pickle.loads(pickle.dumps(res.hits))

    assert hits == res.hits
    assert hits[0].meta == res.hits[0].meta


def test_response_stores_search(dummy_response: Dict[str, Any]) -> None:
    s = Search()
    r = response.Response(s, dummy_response)

    assert r._search is s


def test_attribute_error_in_hits_is_not_hidden(dummy_response: Dict[str, Any]) -> None:
    def f(hit: AttrDict[Any]) -> Any:
        raise AttributeError()

    s = Search().doc_type(employee=f)
    r = response.Response(s, dummy_response)
    with raises(TypeError):
        r.hits


def test_interactive_helpers(dummy_response: Dict[str, Any]) -> None:
    res = response.Response(Search(), dummy_response)
    hits = res.hits
    h = hits[0]

    rhits = (
        "[<Hit(test-index/elasticsearch): {}>, <Hit(test-index/42): {}...}}>, "
        "<Hit(test-index/47): {}...}}>, <Hit(test-index/53): {{}}>]"
    ).format(
        repr(dummy_response["hits"]["hits"][0]["_source"]),
        repr(dummy_response["hits"]["hits"][1]["_source"])[:60],
        repr(dummy_response["hits"]["hits"][2]["_source"])[:60],
    )

    assert res
    assert f"<Response: {rhits}>" == repr(res)
    assert rhits == repr(hits)
    assert {"meta", "city", "name"} == set(dir(h))
    assert "<Hit(test-index/elasticsearch): %r>" % dummy_response["hits"]["hits"][0][
        "_source"
    ] == repr(h)


def test_empty_response_is_false(dummy_response: Dict[str, Any]) -> None:
    dummy_response["hits"]["hits"] = []
    res = response.Response(Search(), dummy_response)

    assert not res


def test_len_response(dummy_response: Dict[str, Any]) -> None:
    res = response.Response(Search(), dummy_response)
    assert len(res) == 4


def test_iterating_over_response_gives_you_hits(dummy_response: Dict[str, Any]) -> None:
    res = response.Response(Search(), dummy_response)
    hits = list(h for h in res)

    assert res.success()
    assert 123 == res.took
    assert 4 == len(hits)
    assert all(isinstance(h, response.Hit) for h in hits)
    h = hits[0]

    assert "test-index" == h.meta.index
    assert "company" == h.meta.doc_type
    assert "elasticsearch" == h.meta.id
    assert 12 == h.meta.score

    assert hits[1].meta.routing == "elasticsearch"


def test_hits_get_wrapped_to_contain_additional_attrs(
    dummy_response: Dict[str, Any]
) -> None:
    res = response.Response(Search(), dummy_response)
    hits = res.hits

    assert 123 == hits.total  # type: ignore[attr-defined]
    assert 12.0 == hits.max_score  # type: ignore[attr-defined]


def test_hits_provide_dot_and_bracket_access_to_attrs(
    dummy_response: Dict[str, Any]
) -> None:
    res = response.Response(Search(), dummy_response)
    h = res.hits[0]

    assert "Elasticsearch" == h.name
    assert "Elasticsearch" == h["name"]

    assert "Honza" == res.hits[2].name.first

    with raises(KeyError):
        h["not_there"]

    with raises(AttributeError):
        h.not_there


def test_slicing_on_response_slices_on_hits(dummy_response: Dict[str, Any]) -> None:
    res = response.Response(Search(), dummy_response)

    assert res[0] is res.hits[0]
    assert res[::-1] == res.hits[::-1]


def test_aggregation_base(agg_response: response.Response) -> None:
    assert agg_response.aggs is agg_response.aggregations
    assert isinstance(agg_response.aggs, response.AggResponse)


def test_metric_agg_works(agg_response: response.Response) -> None:
    assert 25052.0 == agg_response.aggs.sum_lines.value


def test_aggregations_can_be_iterated_over(agg_response: response.Response) -> None:
    aggs = [a for a in agg_response.aggs]

    assert len(aggs) == 3
    assert all(map(lambda a: isinstance(a, AggResponse), aggs))


def test_aggregations_can_be_retrieved_by_name(
    agg_response: response.Response, aggs_search: Search
) -> None:
    a = agg_response.aggs["popular_files"]

    assert isinstance(a, BucketData)
    assert isinstance(a._meta["aggs"], Terms)
    assert a._meta["aggs"] is aggs_search.aggs.aggs["popular_files"]


def test_bucket_response_can_be_iterated_over(agg_response: response.Response) -> None:
    popular_files = agg_response.aggregations.popular_files

    buckets = [b for b in popular_files]
    assert all(isinstance(b, Bucket) for b in buckets)
    assert buckets == popular_files.buckets


def test_bucket_keys_get_deserialized(
    aggs_data: Dict[str, Any], aggs_search: Search
) -> None:
    class Commit(Document):
        info = Object(properties={"committed_date": Date()})

        class Index:
            name = "test-commit"

    aggs_search = aggs_search.doc_type(Commit)
    agg_response = response.Response(aggs_search, aggs_data)

    per_month = agg_response.aggregations.per_month
    for b in per_month:
        assert isinstance(b.key, date)
